本文为阅读Spark The Definitive Guide Chapter 12所做的归纳与整理
Definition
Immutable, Partitioned Collection Of Records
No Concept Of Rows in RDD, individual Records are just Java/Scala/Python Objects. There are no schema in RDDs.
All the code in spark compiles down to RDD
Spark’s Structured API automatically store data in an optimized, compressed binary format while you need to implement this format inside your objects manually
Difference between Datasets and RDDs of Case Classes
Dataset take advantage of the optimizer and format conversion
Dataset donnot need to serialize the whole object
When to Use the Low-Level APIs?
When you’re calling a DataFrame transformation, it actually just becomes a set of RDD transformations
You need some functionality that you cannot find in the higher-level APIs; for example, if you need very tight control over physical data placement across the cluster
You need to maintain some legacy codebase written using RDDs.
You need to do some custom shared variable manipulation.
Types of RDDs
Generic RDD
Key-Value RDD
属性
计算位置 ptionally, a list of preferred locations on which to compute each split (e.g., block locations for a Hadoop Distributed File System [HDFS] file)
Partitioner Optionally, a Partitioner for key-value RDDs (e.g., to say that the RDD is hashpartitioned)
互相依赖 A list of dependencies on other RDDs
分区计算 A function for computing each split
分区list A list of partitions
创建RDD
Interoperating Between Dataframes, Datasets and RDD
Dataset[T]→RDD[T]
1 2
// in Scala: converts a Dataset[Long] to RDD[Long] spark.range(500).rdd
Dataframe -> RDD[T]
To operate on this data, you will need to convert this Row object to the correct data type or extract values out of it.
1 2
// in Scala Dataframe -> RDD[Long] spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
RDD -> Dataframe
1 2
// in Scala spark.range(10).rdd.toDF()
From a Local Collection
1 2 3 4
// in Scala val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ") val words = spark.sparkContext.parallelize(myCollection, 2) words.setName("myWords")
// in Scala words.filter(word => startsWithS(word)).collect() val words2 = words.map(word => (word, word(0), word.startsWith("S"))) words2.filter(record => record._3).take(5)
sort
Sometimes, each current row should return multiple rows, instead
words.sortBy(word => word.length() * -1).take(2)
distinct
1
words.distinct().count()
map
输入一行 apply相应的函数 输出一行
1 2
// in Scala val words2 = words.map(word => (word, word(0), word.startsWith("S")))
flatMap
接受一个返回Iterable对象的函数
1 2
// in Scala words.flatMap(word => word.toSeq).take(5)
RandomSplit
randomly split an RDD into an Array of RDDs
1 2
// in Scala val fiftyFiftySplit = words.randomSplit(Array[Double](0.5, 0.5))
Actions
Actions either collect data to the driver or write to an external data source
reduce
You can use the reduce method to specify a function to “reduce” an RDD of any kind of value to one value
1 2
// in Scala spark.sparkContext.parallelize(1 to 20).reduce(_ + _) // 210
1
// in Scala spark.sparkContext.parallelize(1 to 20).reduce(_ + _) // 210
1 2 3 4 5 6 7 8
// in Scala defwordLengthReducer(leftWord:String, rightWord:String): String = { if (leftWord.length > rightWord.length) return leftWord else return rightWord } words.reduce(wordLengthReducer)
take
1 2 3 4 5 6 7
words.take(5) words.takeOrdered(5) words.top(5) val withReplacement = true val numberToTake = 6 val randomSeed = 100L words.takeSample(withReplacement, numberToTake, randomSeed)
take and its derivative methods take a number of values from your RDD. This works by first scanning one partition and then using the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
count
1
words.count()
1 2 3 4 5 6 7 8
val confidence = 0.95val timeoutMilliseconds = 400 //如果超出一定时间,返回近似值 words.countApprox(timeoutMilliseconds, confidence) //传入relative accuracy words.countApproxDistinct(0.05) //将result set 放进driver words.countByValue() words.countByValueApprox(1000, 0.95)
first max and min
1 2 3
words.first() spark.sparkContext.parallelize(1 to 20).max() spark.sparkContext.parallelize(1 to 20).min()
max and min return the maximum values and minimum values, respectively:
first The first method returns the first value in the dataset:
Saving Files
Saving files means writing to plain-text files. With RDDs, you cannot actually “save” to a data source in the conventional sense. You must iterate over the partitions in order to save the contents of each partition to some external database. This is a low-level approach that reveals the underlying operation that is being performed in the higher-level APIs.
// in Scala words.getStorageLevel words.cache() rdd1.persist(StorageLevel.MEMORY_AND_DISK)
setCheckPoint
check pointing is the act of saving an RDD to disk so that future references to this RDD point to those intermediate partitions on disk rather than recomputing the RDD from its original source
// in Scala words.mapPartitions(part => Iterator[Int](1)).sum()
val sc = newSparkContext(newSparkConf().setAppName("map_mapPartitions_demo").setMaster("local")) val arrayRDD =sc.parallelize(Array(1,2,3,4,5,6,7,8,9)) arrayRDD.mapPartitions(elements=>{ var result = newArrayBuffer[Int]() elements.foreach(element=>{ result.+=(element) }) result.iterator }).foreach(println)
words.foreachPartition { iter => import java.io._ import scala.util.Random val randomFileName = newRandom().nextInt() val pw = newPrintWriter(newFile(s"/tmp/random-file-${randomFileName}.txt")) while (iter.hasNext) { pw.write(iter.next()) } pw.close() }
Glom
glom is an interesting function that takes every partition in your dataset and converts them to arrays. This can be useful if you’re going to collect the data to the driver and want to have an array for each partition